package com.wugui.datax.rpc.remoting.invoker;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.wugui.datax.rpc.registry.ServiceRegistry;
import com.wugui.datax.rpc.registry.impl.LocalServiceRegistry;
import com.wugui.datax.rpc.remoting.net.params.BaseCallback;
import com.wugui.datax.rpc.remoting.net.params.XxlRpcFutureResponse;
import com.wugui.datax.rpc.remoting.net.params.XxlRpcResponse;
import com.wugui.datax.rpc.util.XxlRpcException;

/**
 * xxl-rpc invoker factory, init service-registry
 *
 * @author xuxueli 2018-10-19
 */
public class XxlRpcInvokerFactory {
	private static Logger logger = LoggerFactory.getLogger(XxlRpcInvokerFactory.class);

	// ---------------------- default instance ----------------------

	private static volatile XxlRpcInvokerFactory instance = new XxlRpcInvokerFactory(LocalServiceRegistry.class, null);

	public static XxlRpcInvokerFactory getInstance() {
		return instance;
	}

	// ---------------------- config ----------------------

	private Class<? extends ServiceRegistry> serviceRegistryClass; // class.forname
	private Map<String, String> serviceRegistryParam;

	public XxlRpcInvokerFactory() {
	}

	public XxlRpcInvokerFactory(Class<? extends ServiceRegistry> serviceRegistryClass,
			Map<String, String> serviceRegistryParam) {
		this.serviceRegistryClass = serviceRegistryClass;
		this.serviceRegistryParam = serviceRegistryParam;
	}

	// ---------------------- start / stop ----------------------

	public void start() throws Exception {
		// start registry
		if (serviceRegistryClass != null) {
			serviceRegistry = serviceRegistryClass.newInstance();
			serviceRegistry.start(serviceRegistryParam);
		}
	}

	public void stop() throws Exception {
		// stop registry
		if (serviceRegistry != null) {
			serviceRegistry.stop();
		}

		// stop callback
		if (stopCallbackList.size() > 0) {
			for (BaseCallback callback : stopCallbackList) {
				try {
					callback.run();
				} catch (Exception e) {
					logger.error(e.getMessage(), e);
				}
			}
		}

		// stop CallbackThreadPool
		stopCallbackThreadPool();
	}

	// ---------------------- service registry ----------------------

	private ServiceRegistry serviceRegistry;

	public ServiceRegistry getServiceRegistry() {
		return serviceRegistry;
	}

	// ---------------------- service registry ----------------------

	private List<BaseCallback> stopCallbackList = new ArrayList<BaseCallback>();

	public void addStopCallBack(BaseCallback callback) {
		stopCallbackList.add(callback);
	}

	// ---------------------- future-response pool ----------------------

	// XxlRpcFutureResponseFactory

	private ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>();

	public void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse) {
		futureResponsePool.put(requestId, futureResponse);
	}

	public void removeInvokerFuture(String requestId) {
		futureResponsePool.remove(requestId);
	}

	public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse) {

		// get
		final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
		if (futureResponse == null) {
			return;
		}

		// notify
		if (futureResponse.getInvokeCallback() != null) {

			// callback type
			try {
				executeResponseCallback(() -> {
					if (xxlRpcResponse.getErrorMsg() != null) {
						futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
					} else {
						futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
					}
				});
			} catch (Exception e) {
				logger.error(e.getMessage(), e);
			}
		} else {

			// other nomal type
			futureResponse.setResponse(xxlRpcResponse);
		}

		// do remove
		futureResponsePool.remove(requestId);

	}

	// ---------------------- response callback ThreadPool ----------------------

	private ThreadPoolExecutor responseCallbackThreadPool = null;

	public void executeResponseCallback(Runnable runnable) {

		if (responseCallbackThreadPool == null) {
			synchronized (this) {
				if (responseCallbackThreadPool == null) {
					responseCallbackThreadPool = new ThreadPoolExecutor(10, 100, 60L, TimeUnit.SECONDS,
							new LinkedBlockingQueue<>(1000),
							r -> new Thread(r,
									"xxl-rpc, XxlRpcInvokerFactory-responseCallbackThreadPool-" + r.hashCode()),
							(r, executor) -> {
								throw new XxlRpcException("xxl-rpc Invoke Callback Thread pool is EXHAUSTED!");
							}); // default maxThreads 300, minThreads 60
				}
			}
		}
		responseCallbackThreadPool.execute(runnable);
	}

	public void stopCallbackThreadPool() {
		if (responseCallbackThreadPool != null) {
			responseCallbackThreadPool.shutdown();
		}
	}

}
